package com.ouyunc.mq.config.kafka.strategy.impl;

import com.ouyunc.mq.config.kafka.enums.KafkaMqEnum;
import com.ouyunc.mq.config.kafka.properties.ClusterKafkaMqProperties;
import com.ouyunc.mq.config.kafka.strategy.KafkaMqStrategy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author fangzhenxun
 * @Description 集群策略具体类
 * @Date 2020/3/13 11:13
 **/
@Slf4j
@Configuration
@ConditionalOnExpression("'${mq.kafka.primary}'.equals('CLUSTER')")
public class ClusterKafkaMqStrategy implements KafkaMqStrategy {

    /**
     * 集群模式kakfa的属性配置类
     **/
    @Autowired
    private ClusterKafkaMqProperties clusterKafkaMqProperties;


    /**
     * @Author fangzhenxun
     * @Description  标识rabbitmq实现类的模式类型
     * @Date 2020/3/13 11:13
     * @param
     * @return com.xyt.mq.config.rabbitmq.enums.MqEnum
     **/
    @Override
    public KafkaMqEnum getType() {
        return KafkaMqEnum.CLUSTER;
    }

    /**
     * @Author fangzhenxun
     * @Description  构建生产者工厂，将该方法装配成bean,交给spring来管理
     * @Date 2020/3/13 11:21
     * @param
     * @return org.springframework.kafka.core.ProducerFactory
     **/
    @Bean
    @Override
    public ProducerFactory buildProducerFactory() {
        DefaultKafkaProducerFactory defaultKafkaProducerFactory =  new DefaultKafkaProducerFactory(producerProperties());
        return defaultKafkaProducerFactory;
    }


    /**
     * @Author fangzhenxun
     * @Description  kafka 生产者的属性配置
     * @Date 2020/3/13 13:38
     * @param
     * @return java.util.Map<java.lang.String,java.lang.Object>
     **/
    private Map<String, Object> producerProperties(){
        Map<String, Object> producerPropertiesMap = new HashMap<>(9);
        //kafka 地址,多个使用逗号隔开
        producerPropertiesMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterKafkaMqProperties.getBootstrapServers());
        if (CollectionUtils.isNotEmpty(clusterKafkaMqProperties.getProducer().getBootstrapServers())) {
            producerPropertiesMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterKafkaMqProperties.getProducer().getBootstrapServers());
        }
        //消息确认应答模式
        producerPropertiesMap.put(ProducerConfig.ACKS_CONFIG, clusterKafkaMqProperties.getProducer().getAck());
        //批量发送的消息数量
        producerPropertiesMap.put(ProducerConfig.BATCH_SIZE_CONFIG, clusterKafkaMqProperties.getProducer().getBatchSize());
        //32M批处理缓冲区
        producerPropertiesMap.put(ProducerConfig.BUFFER_MEMORY_CONFIG, clusterKafkaMqProperties.getProducer().getBufferMemory());
        //发送失败后的重复发送次数
        producerPropertiesMap.put(ProducerConfig.RETRIES_CONFIG, clusterKafkaMqProperties.getProducer().getRetries());
        //linger.ms设置(吞吐量和延时性能)producer是按照batch进行发送的，但是还要看linger.ms的值，默认是0，表示不做停留。这种情况下，可能有的batch中没有包含足够多的produce请求就被发送出去了，造成了大量的小batch，给网络IO带来的极大的压力
        producerPropertiesMap.put(ProducerConfig.LINGER_MS_CONFIG, clusterKafkaMqProperties.getProducer().getLingerMs());

        //#指定消息key和消息体的编解码方式
        producerPropertiesMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, clusterKafkaMqProperties.getProducer().getKeySerializer());
        producerPropertiesMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, clusterKafkaMqProperties.getProducer().getValueSerializer());
        return producerPropertiesMap;
    }




    /**
     * @Author fangzhenxun
     * @Description  kafka 消费者监听器，这里主要用于配置消费这的并发数等一些配置
     * @Date 2020/3/13 14:43
     * @param
     * @return org.springframework.kafka.config.KafkaListenerContainerFactory<?>
     **/
    @Bean("kafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<?> multiListenerContainer() {
        ConcurrentKafkaListenerContainerFactory<String, String> ckcFactory = new ConcurrentKafkaListenerContainerFactory<>();
        //配置消费者工厂
        ckcFactory.setConsumerFactory(consumerFactory());
        //是否批量消费
        ckcFactory.setBatchListener(clusterKafkaMqProperties.getListener().getBatchListener());
        //设置消费的线程数
        ckcFactory.setConcurrency(clusterKafkaMqProperties.getListener().getConcurrency());
        //如果消息队列中没有消息，等待timeout毫秒后，调用poll()方法。
        // 如果队列中有消息，立即消费消息，每次消费的消息的多少可以通过max.poll.records配置。
        //手动提交无需配置
        ckcFactory.getContainerProperties().setPollTimeout(clusterKafkaMqProperties.getListener().getPollTimeout());
        //设置提交偏移量的方式， MANUAL_IMMEDIATE 表示消费一条提交一次；MANUAL表示批量提交一次
        ckcFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(clusterKafkaMqProperties.getListener().getAckMode()));
        return ckcFactory;
    }


    /**
     * @Author fangzhenxun
     * @Description  装配消费者工厂
     * @Date 2020/3/13 15:09
     * @param
     * @return org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.String>
     **/
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        DefaultKafkaConsumerFactory<String, String> defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
        return defaultKafkaConsumerFactory;
    }


    /**
     * @Author fangzhenxun
     * @Description   配置消费者属性参数
     * @Date 2020/3/13 15:10
     * @param
     * @return java.util.Map<java.lang.String,java.lang.Object>
     **/
    private Map<String, Object> consumerProperties() {
        Map<String, Object> consumerPropertiesMap = new HashMap<>(11);
        //消费的服务地址
        consumerPropertiesMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterKafkaMqProperties.getBootstrapServers());
        if (CollectionUtils.isNotEmpty(clusterKafkaMqProperties.getConsumer().getBootstrapServers())) {
            consumerPropertiesMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterKafkaMqProperties.getConsumer().getBootstrapServers());
        }
        //消费者组id
        consumerPropertiesMap.put(ConsumerConfig.GROUP_ID_CONFIG, clusterKafkaMqProperties.getConsumer().getGroupId());
        //是否开启自动提交
        consumerPropertiesMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, clusterKafkaMqProperties.getConsumer().getEnableAutoCommit());
        //批量消费一次最大拉取的数据量
        consumerPropertiesMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, clusterKafkaMqProperties.getConsumer().getMaxPollRecords());
        //最早未被消费的offset earliest
        consumerPropertiesMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, clusterKafkaMqProperties.getConsumer().getAutoOffsetReset());
        //连接超时时间,20000
        consumerPropertiesMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, clusterKafkaMqProperties.getConsumer().getSessionTimeoutMs());
        //消费者最大心跳时间间隔,默认300s   300000
        consumerPropertiesMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, clusterKafkaMqProperties.getConsumer().getMaxPollIntervalMs());
        //设置拉取数据的大小,15M  15728640
        consumerPropertiesMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, clusterKafkaMqProperties.getConsumer().getMaxPartitionFetchBytes());
        //自动提交的间隔时间
        consumerPropertiesMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, clusterKafkaMqProperties.getConsumer().getAutoCommitIntervalMs());
        //指定消息key和消息体的编解码方式
        consumerPropertiesMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, clusterKafkaMqProperties.getConsumer().getKeyDeserializer());
        consumerPropertiesMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, clusterKafkaMqProperties.getConsumer().getValueDeserializer());
        return consumerPropertiesMap;
    }


}
